AthenaのクエリでWHERE句に関数を利用してもパーティションを効率よく走査してくれるか確認してみた

AthenaのクエリでWHERE句に関数を利用してもパーティションを効率よく走査してくれるか確認してみた

Clock Icon2024.06.17

CX事業本部@大阪の岩田です。

年(year)と月(month)にパーティションが切られたAthenaのテーブルに対して直近1週間のデータを集計するクエリを書く機会がありました。直近1週間ということで月跨ぎのケースを考慮して最初は以下のようにWHERE句を書いていました。

WHERE
  -- パーティションによる絞り込み
	(
		(year = '2023' AND month = '12')
			OR
		(year = '2024' AND month = '01')
	)
  AND -- さらに直近1週間のデータに絞り込み
  AND -- その他の条件1
  AND -- その他の条件2
  AND -- その他の条件3

しかし、WHERE句の条件が増えてくると最初にパーティションを絞り込む部分がゴチャゴチャしてくるのが気になり以下のような書き換えを検討しました。

CONCAT(year, month) BETWEEN '202312' AND '202401'

ここで1つの疑問が浮かびました。

パーティションが設定されたカラムに対してCONCAT等の関数を適用した場合でもAtehanaのクエリエンジンは当該パーティションだけを効率よく走査してくれるのでしょうか?一応prestroについて調べると以下の情報がヒットしたのでAthenaでもパーティションを考慮していい感じにフィルタしてくれそうではあります。

ということで実際にいくつかのクエリを実行しながら確認してみました。

環境

今回利用した環境です

  • Athena engine version 3
  • パーティション管理: Glue Data Catalog

以前こちらのLTで利用したSlackの投稿を分析対象としてSQLを発行していきます

Glueのマネコンから確認できるテーブルのパーティションは以下の通りでした

やってみる

ここから色々なEXPLAIN ANALYZEを付けながら色々なSQLを実行し、出力を確認していきます。

とりあえず全件スキャン

まずは全件スキャン時の実行計画を確認しておきます。本来は「Athena破産」のリスクがあるので避けるべきSQLですが、まあ対したデータ量じゃないことが分かっているので気にせずスキャンしちゃいます。

explain analyze
select count(*) year,
	month
from cm_iwata_athena_test
group by year,
	month

実行結果です

Query Plan
Queued: 210.84us, Analysis: 189.20ms, Planning: 523.73ms, Execution: 1.48s
Fragment 1 [HASH]
    CPU: 9.21ms, Scheduled: 9.45ms, Blocked 6.66s (Input: 3.33s, Output: 0.00ns), Input: 372 rows (12.35kB), Data Scanned: 0B; per task: avg.: 372.00 std.dev.: 0.00, Output: 24 rows (384B)
    Output layout: [month, count]
    Output partitioning: SINGLE []
    Project[projectLocality = LOCAL, protectedBarrier = NONE]
    │   Layout: [month:varchar, count:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
    │   CPU: 1.00ms (0.01%), Scheduled: 1.00ms (0.00%), Blocked: 0.00ns (0.00%), Output: 24 rows (384B)
    │   Input avg.: 6.00 rows, Input std.dev.: 37.27%
    └─ Aggregate[type = FINAL, keys = [year, month], hash = [$hashvalue]]
       │   Layout: [year:varchar, month:varchar, $hashvalue:bigint, count:bigint]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
       │   CPU: 3.00ms (0.04%), Scheduled: 3.00ms (0.01%), Blocked: 0.00ns (0.00%), Output: 24 rows (816B)
       │   Input avg.: 93.00 rows, Input std.dev.: 41.19%
       │   count := count("count_0")
       └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["year", "month"]]
          │   Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue:bigint]
          │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
          │   CPU: 3.00ms (0.04%), Scheduled: 3.00ms (0.01%), Blocked: 3.32s (49.92%), Output: 372 rows (12.35kB)
          │   Input avg.: 93.00 rows, Input std.dev.: 4.09%
          └─ RemoteSource[sourceFragmentIds = [2]]
                 Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue_1:bigint]
                 CPU: 1.00ms (0.01%), Scheduled: 1.00ms (0.00%), Blocked: 3.33s (50.08%), Output: 372 rows (12.35kB)
                 Input avg.: 93.00 rows, Input std.dev.: 4.09%

Fragment 2 [SOURCE]
    CPU: 7.90s, Scheduled: 29.27s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 36065 rows (21.17MB), Data Scanned: 21.17MB; per task: avg.: 36065.00 std.dev.: 0.00, Output: 372 rows (12.35kB)
    Output layout: [year, month, count_0, $hashvalue_2]
    Output partitioning: HASH [year, month][$hashvalue_2]
    Aggregate[type = PARTIAL, keys = [year, month], hash = [$hashvalue_2]]
    │   Layout: [year:varchar, month:varchar, $hashvalue_2:bigint, count_0:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
    │   CPU: 53.00ms (0.67%), Scheduled: 61.00ms (0.21%), Blocked: 0.00ns (0.00%), Output: 372 rows (12.35kB)
    │   Input avg.: 96.95 rows, Input std.dev.: 87.35%
    │   count_0 := count(*)
    └─ ScanProject[table = awsdatacatalog:devioosaka:cm_iwata_athena_test, projectLocality = LOCAL, protectedBarrier = NONE]
           Layout: [year:varchar, month:varchar, $hashvalue_2:bigint]
           Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           CPU: 7.84s (99.23%), Scheduled: 29.21s (99.76%), Blocked: 0.00ns (0.00%), Output: 36065 rows (880.49kB)
           Input avg.: 96.95 rows, Input std.dev.: 87.35%
           $hashvalue_2 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("year"), 0)), COALESCE("$operator$hash_code"("month"), 0))
           month := month:string:PARTITION_KEY
               :: [[01], [02], [03], [04], [05], [06], [07], [08], [09], [10], [11], [12]]
           year := year:string:PARTITION_KEY
               :: [[2019], [2020]]
           Input: 36065 rows (21.17MB), Filtered: 0.00%, Physical input: 21.17MB, Physical input time: 0.00ns

21.17MBのデータをスキャンしていることが分かります。

yearとmonthを1つずつ条件指定

続いて2019年12月と2020年1月のデータを対象に絞り込んでみます。この際year=...month=...といった具合に特に関数は使わず地道に1つづつ条件を指定していきます。

explain analyze
select count(*) year,
	month
from cm_iwata_athena_test
where (
		(
			year = '2019'
			and month = '12'
		)
		or (
			year = '2020'
			and month = '01'
		)
	)
group by year,
	month

実行結果です

Query Plan
Queued: 170.82us, Analysis: 146.35ms, Planning: 285.52ms, Execution: 750.49ms
Fragment 1 [HASH]
    CPU: 3.93ms, Scheduled: 4.02ms, Blocked 2.77s (Input: 1.39s, Output: 0.00ns), Input: 47 rows (1.56kB), Data Scanned: 0B; per task: avg.: 47.00 std.dev.: 0.00, Output: 2 rows (32B)
    Output layout: [month, count]
    Output partitioning: SINGLE []
    Project[projectLocality = LOCAL, protectedBarrier = NONE]
    │   Layout: [month:varchar, count:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
    │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 0.00ns (0.00%), Output: 2 rows (32B)
    │   Input avg.: 0.50 rows, Input std.dev.: 100.00%
    └─ Aggregate[type = FINAL, keys = [year, month], hash = [$hashvalue]]
       │   Layout: [year:varchar, month:varchar, $hashvalue:bigint, count:bigint]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
       │   CPU: 1.00ms (0.04%), Scheduled: 1.00ms (0.02%), Blocked: 0.00ns (0.00%), Output: 2 rows (68B)
       │   Input avg.: 11.75 rows, Input std.dev.: 100.05%
       │   count := count("count_0")
       └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["year", "month"]]
          │   Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue:bigint]
          │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
          │   CPU: 1.00ms (0.04%), Scheduled: 1.00ms (0.02%), Blocked: 1.39s (50.00%), Output: 47 rows (1.56kB)
          │   Input avg.: 11.75 rows, Input std.dev.: 13.95%
          └─ RemoteSource[sourceFragmentIds = [2]]
                 Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue_1:bigint]
                 CPU: 1.00ms (0.04%), Scheduled: 1.00ms (0.02%), Blocked: 1.39s (50.00%), Output: 47 rows (1.56kB)
                 Input avg.: 11.75 rows, Input std.dev.: 13.95%

Fragment 2 [SOURCE]
    CPU: 2.79s, Scheduled: 6.39s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 6251 rows (4.84MB), Data Scanned: 4.84MB; per task: avg.: 6251.00 std.dev.: 0.00, Output: 47 rows (1.56kB)
    Output layout: [year, month, count_0, $hashvalue_2]
    Output partitioning: HASH [year, month][$hashvalue_2]
    Aggregate[type = PARTIAL, keys = [year, month], hash = [$hashvalue_2]]
    │   Layout: [year:varchar, month:varchar, $hashvalue_2:bigint, count_0:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
    │   CPU: 7.00ms (0.25%), Scheduled: 8.00ms (0.12%), Blocked: 0.00ns (0.00%), Output: 47 rows (1.56kB)
    │   Input avg.: 133.00 rows, Input std.dev.: 73.01%
    │   count_0 := count(*)
    └─ ScanFilterProject[table = awsdatacatalog:devioosaka:cm_iwata_athena_test, filterPredicate = ((("year" = VARCHAR '2019') OR ("month" = VARCHAR '01')) AND (("month" = VARCHAR '12') OR ("year" = VARCHAR '2020'))), projectLocality = LOCAL, protectedBarrier = NONE]
           Layout: [year:varchar, month:varchar, $hashvalue_2:bigint]
           Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           CPU: 2.78s (99.64%), Scheduled: 6.39s (99.83%), Blocked: 0.00ns (0.00%), Output: 6251 rows (152.61kB)
           Input avg.: 133.00 rows, Input std.dev.: 73.01%
           $hashvalue_2 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("year"), 0)), COALESCE("$operator$hash_code"("month"), 0))
           month := month:string:PARTITION_KEY
               :: [[01], [12]]
           year := year:string:PARTITION_KEY
               :: [[2019], [2020]]
           Input: 6251 rows (4.84MB), Filtered: 0.00%, Physical input: 4.84MB, Physical input time: 0.00ns

スキャン対象が4.84MBに減少し、以下の出力の通りアクセス対象のパーティションが絞られていることが分かります

month := month:string:PARTITION_KEY
               :: [[01], [12]]
           year := year:string:PARTITION_KEY
               :: [[2019], [2020]]

CONCATとBETWEENで範囲を指定してみる

続いてCONCATとBETWENNを使って日付=パーティションの範囲を指定してみます。

explain analyze
select count(*) year,
	month
from cm_iwata_athena_test
where concat(year, month) between '201912' and '202001'
group by year,
	month

実行結果です

Query Plan
Queued: 674.15us, Analysis: 175.82ms, Planning: 476.17ms, Execution: 907.41ms
Fragment 1 [HASH]
    CPU: 4.95ms, Scheduled: 5.05ms, Blocked 2.46s (Input: 1.22s, Output: 0.00ns), Input: 47 rows (1.56kB), Data Scanned: 0B; per task: avg.: 47.00 std.dev.: 0.00, Output: 2 rows (32B)
    Output layout: [month, count]
    Output partitioning: SINGLE []
    Project[projectLocality = LOCAL, protectedBarrier = NONE]
    │   Layout: [month:varchar, count:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
    │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 0.00ns (0.00%), Output: 2 rows (32B)
    │   Input avg.: 0.50 rows, Input std.dev.: 100.00%
    └─ Aggregate[type = FINAL, keys = [year, month], hash = [$hashvalue]]
       │   Layout: [year:varchar, month:varchar, $hashvalue:bigint, count:bigint]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
       │   CPU: 1.00ms (0.05%), Scheduled: 1.00ms (0.02%), Blocked: 0.00ns (0.00%), Output: 2 rows (68B)
       │   Input avg.: 11.75 rows, Input std.dev.: 100.05%
       │   count := count("count_0")
       └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["year", "month"]]
          │   Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue:bigint]
          │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
          │   CPU: 1.00ms (0.05%), Scheduled: 1.00ms (0.02%), Blocked: 1.23s (50.20%), Output: 47 rows (1.56kB)
          │   Input avg.: 11.75 rows, Input std.dev.: 9.27%
          └─ RemoteSource[sourceFragmentIds = [2]]
                 Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue_1:bigint]
                 CPU: 1.00ms (0.05%), Scheduled: 1.00ms (0.02%), Blocked: 1.22s (49.80%), Output: 47 rows (1.56kB)
                 Input avg.: 11.75 rows, Input std.dev.: 9.27%

Fragment 2 [SOURCE]
    CPU: 1.87s, Scheduled: 5.78s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 6251 rows (4.84MB), Data Scanned: 4.84MB; per task: avg.: 6251.00 std.dev.: 0.00, Output: 47 rows (1.56kB)
    Output layout: [year, month, count_0, $hashvalue_2]
    Output partitioning: HASH [year, month][$hashvalue_2]
    Aggregate[type = PARTIAL, keys = [year, month], hash = [$hashvalue_2]]
    │   Layout: [year:varchar, month:varchar, $hashvalue_2:bigint, count_0:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
    │   CPU: 13.00ms (0.70%), Scheduled: 16.00ms (0.28%), Blocked: 0.00ns (0.00%), Output: 47 rows (1.56kB)
    │   Input avg.: 133.00 rows, Input std.dev.: 73.01%
    │   count_0 := count(*)
    └─ ScanFilterProject[table = awsdatacatalog:devioosaka:cm_iwata_athena_test, filterPredicate = ((concat("year", "month") >= VARCHAR '201912') AND (concat("year", "month") <= VARCHAR '202001')), projectLocality = LOCAL, protectedBarrier = NONE]
           Layout: [year:varchar, month:varchar, $hashvalue_2:bigint]
           Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           CPU: 1.85s (99.14%), Scheduled: 5.76s (99.67%), Blocked: 0.00ns (0.00%), Output: 6251 rows (152.61kB)
           Input avg.: 133.00 rows, Input std.dev.: 73.01%
           $hashvalue_2 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("year"), 0)), COALESCE("$operator$hash_code"("month"), 0))
           month := month:string:PARTITION_KEY
               :: [[01], [12]]
           year := year:string:PARTITION_KEY
               :: [[2019], [2020]]
           Input: 6251 rows (4.84MB), Filtered: 0.00%, Physical input: 4.84MB, Physical input time: 0.00ns

先ほどと同様スキャン対象は4.84MBでした。CONCATを使っても問題なくパーティションを有効活用できてそうですね

date_parseしてみる

せっかくなのでもう少し試してみます。yearとmonthをdate_parseしてvarcharからtimestamp型にCASTするとどうでしょうか?

explain analyze
select count(*) year,
	month
from cm_iwata_athena_test
where date_parse(concat(year, month), '%Y%m') between cast('2019-12-01' as TIMESTAMP)
	and cast('2020-01-31' as TIMESTAMP)
group by year,
	month

実行結果です

Query Plan
Queued: 431.40us, Analysis: 175.55ms, Planning: 437.65ms, Execution: 848.79ms
Fragment 1 [HASH]
    CPU: 3.82ms, Scheduled: 3.92ms, Blocked 2.29s (Input: 1.14s, Output: 0.00ns), Input: 47 rows (1.56kB), Data Scanned: 0B; per task: avg.: 47.00 std.dev.: 0.00, Output: 2 rows (32B)
    Output layout: [month, count]
    Output partitioning: SINGLE []
    Project[projectLocality = LOCAL, protectedBarrier = NONE]
    │   Layout: [month:varchar, count:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
    │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 0.00ns (0.00%), Output: 2 rows (32B)
    │   Input avg.: 0.50 rows, Input std.dev.: 100.00%
    └─ Aggregate[type = FINAL, keys = [year, month], hash = [$hashvalue]]
       │   Layout: [year:varchar, month:varchar, $hashvalue:bigint, count:bigint]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
       │   CPU: 1.00ms (0.04%), Scheduled: 1.00ms (0.02%), Blocked: 0.00ns (0.00%), Output: 2 rows (68B)
       │   Input avg.: 11.75 rows, Input std.dev.: 100.05%
       │   count := count("count_0")
       └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["year", "month"]]
          │   Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue:bigint]
          │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
          │   CPU: 1.00ms (0.04%), Scheduled: 1.00ms (0.02%), Blocked: 1.14s (50.00%), Output: 47 rows (1.56kB)
          │   Input avg.: 11.75 rows, Input std.dev.: 20.30%
          └─ RemoteSource[sourceFragmentIds = [2]]
                 Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue_1:bigint]
                 CPU: 1.00ms (0.04%), Scheduled: 1.00ms (0.02%), Blocked: 1.14s (50.00%), Output: 47 rows (1.56kB)
                 Input avg.: 11.75 rows, Input std.dev.: 20.30%

Fragment 2 [SOURCE]
    CPU: 2.24s, Scheduled: 5.33s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 6251 rows (4.84MB), Data Scanned: 4.84MB; per task: avg.: 6251.00 std.dev.: 0.00, Output: 47 rows (1.56kB)
    Output layout: [year, month, count_0, $hashvalue_2]
    Output partitioning: HASH [year, month][$hashvalue_2]
    Aggregate[type = PARTIAL, keys = [year, month], hash = [$hashvalue_2]]
    │   Layout: [year:varchar, month:varchar, $hashvalue_2:bigint, count_0:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
    │   CPU: 8.00ms (0.36%), Scheduled: 8.00ms (0.15%), Blocked: 0.00ns (0.00%), Output: 47 rows (1.56kB)
    │   Input avg.: 133.00 rows, Input std.dev.: 73.01%
    │   count_0 := count(*)
    └─ ScanFilterProject[table = awsdatacatalog:devioosaka:cm_iwata_athena_test, filterPredicate = ((date_parse(concat("year", "month"), '%Y%m') >= TIMESTAMP '2019-12-01 00:00:00.000') AND (date_parse(concat("year", "month"), '%Y%m') <= TIMESTAMP '2020-01-31 00:00:00.000')), projectLocality = LOCAL, protectedBarrier = NONE]
           Layout: [year:varchar, month:varchar, $hashvalue_2:bigint]
           Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           CPU: 2.23s (99.51%), Scheduled: 5.33s (99.79%), Blocked: 0.00ns (0.00%), Output: 6251 rows (152.61kB)
           Input avg.: 133.00 rows, Input std.dev.: 73.01%
           $hashvalue_2 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("year"), 0)), COALESCE("$operator$hash_code"("month"), 0))
           month := month:string:PARTITION_KEY
               :: [[01], [12]]
           year := year:string:PARTITION_KEY
               :: [[2019], [2020]]
           Input: 6251 rows (4.84MB), Filtered: 0.00%, Physical input: 4.84MB, Physical input time: 0.00ns

こちらも問題なくパーティションを有効活用できています。

RANDOM()と組み合わせてみる

先ほどまではmonthが01のレコードを対象としていましたが、RANDOM()を使って求めたmonth + 1~10の整数が12になるレコードを対象として抽出してみます

explain analyze
select count(*) year,
	month
from cm_iwata_athena_test
where year = '2019'
	and cast(month as integer) + (CAST(floor(rand() * 10) + 1 AS integer)) = 12
group by year,
	month

実行結果です

Query Plan
Queued: 185.83us, Analysis: 148.58ms, Planning: 307.57ms, Execution: 983.94ms
Fragment 1 [HASH]
    CPU: 9.33ms, Scheduled: 9.63ms, Blocked 4.44s (Input: 2.22s, Output: 0.00ns), Input: 221 rows (7.34kB), Data Scanned: 0B; per task: avg.: 221.00 std.dev.: 0.00, Output: 10 rows (160B)
    Output layout: [month, count]
    Output partitioning: SINGLE []
    Project[projectLocality = LOCAL, protectedBarrier = NONE]
    │   Layout: [month:varchar, count:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
    │   CPU: 1.00ms (0.03%), Scheduled: 1.00ms (0.01%), Blocked: 0.00ns (0.00%), Output: 10 rows (160B)
    │   Input avg.: 2.50 rows, Input std.dev.: 34.64%
    └─ Aggregate[type = FINAL, keys = [year, month], hash = [$hashvalue]]
       │   Layout: [year:varchar, month:varchar, $hashvalue:bigint, count:bigint]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
       │   CPU: 3.00ms (0.09%), Scheduled: 3.00ms (0.02%), Blocked: 0.00ns (0.00%), Output: 10 rows (340B)
       │   Input avg.: 55.25 rows, Input std.dev.: 37.45%
       │   count := count("count_0")
       └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["year", "month"]]
          │   Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue:bigint]
          │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
          │   CPU: 2.00ms (0.06%), Scheduled: 2.00ms (0.01%), Blocked: 2.22s (50.00%), Output: 221 rows (7.34kB)
          │   Input avg.: 55.25 rows, Input std.dev.: 15.27%
          └─ RemoteSource[sourceFragmentIds = [2]]
                 Layout: [year:varchar, month:varchar, count_0:bigint, $hashvalue_1:bigint]
                 CPU: 1.00ms (0.03%), Scheduled: 1.00ms (0.01%), Blocked: 2.22s (50.00%), Output: 221 rows (7.34kB)
                 Input avg.: 55.25 rows, Input std.dev.: 15.27%

Fragment 2 [SOURCE]
    CPU: 3.43s, Scheduled: 18.11s, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 27818 rows (14.36MB), Data Scanned: 14.36MB; per task: avg.: 27818.00 std.dev.: 0.00, Output: 221 rows (7.34kB)
    Output layout: [year, month, count_0, $hashvalue_2]
    Output partitioning: HASH [year, month][$hashvalue_2]
    Aggregate[type = PARTIAL, keys = [year, month], hash = [$hashvalue_2]]
    │   Layout: [year:varchar, month:varchar, $hashvalue_2:bigint, count_0:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
    │   CPU: 38.00ms (1.11%), Scheduled: 49.00ms (0.27%), Blocked: 0.00ns (0.00%), Output: 221 rows (7.34kB)
    │   Input avg.: 8.43 rows, Input std.dev.: 100.77%
    │   count_0 := count(*)
    └─ ScanFilterProject[table = awsdatacatalog:devioosaka:cm_iwata_athena_test, filterPredicate = ((CAST("month" AS integer) + CAST((floor((rand() * 1E1)) + 1E0) AS integer)) = 12), projectLocality = LOCAL, protectedBarrier = NONE]
           Layout: [year:varchar, month:varchar, $hashvalue_2:bigint]
           Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           CPU: 3.39s (98.69%), Scheduled: 18.06s (99.69%), Blocked: 0.00ns (0.00%), Output: 2470 rows (60.30kB)
           Input avg.: 94.94 rows, Input std.dev.: 84.11%
           $hashvalue_2 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("year"), 0)), COALESCE("$operator$hash_code"("month"), 0))
           month := month:string:PARTITION_KEY
               :: [[01], [02], [03], [04], [05], [06], [07], [08], [09], [10], [11], [12]]
           year := year:string:PARTITION_KEY
               :: [[2019]]
           Input: 27818 rows (14.36MB), Filtered: 91.12%, Physical input: 14.36MB, Physical input time: 0.00ns

今度はyear=2019のかつmonthが1~12の合計12パーティションにアクセスしていることが分かります。スキャン量も14.36MBに増えています。RANDOM()の結果が確定しないとどのパーティションを読み込むべきかAthenaのエンジンからは判断できないのでこれは妥当な動きでしょう。

まとめ

Athenaでクエリを実行する際WHERE句に関数を使ってもいい感じにパーティションを使ってくれることを確認しました。ちゃんとオプティマイザが最適化してくれることが分かって一安心です。とはいえあまり複雑なことをし過ぎると意図通りのアクセスパスにならない可能性もあるので、実際にクエリを書く際は「Athena破産」することが無いようにしっかりEXPLAIN ANALYZEの結果を確認しながらクエリを構築していきましょう。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.